package scanner

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"sync"
	"time"

	"buf.build/gen/go/safedep/api/grpc/go/safedep/services/malysis/v1/malysisv1grpc"
	malysisv1pb "buf.build/gen/go/safedep/api/protocolbuffers/go/safedep/messages/malysis/v1"
	packagev1 "buf.build/gen/go/safedep/api/protocolbuffers/go/safedep/messages/package/v1"
	malysisv1 "buf.build/gen/go/safedep/api/protocolbuffers/go/safedep/services/malysis/v1"
	"github.com/safedep/dry/adapters"
	"github.com/safedep/vet/pkg/common/logger"
	"github.com/safedep/vet/pkg/common/utils"
	"github.com/safedep/vet/pkg/models"
	"google.golang.org/grpc"
)

var errMalysisPollRetryInFuture = errors.New("retry in future")

type MalysisMalwareEnricherConfig struct {
	// Timeout for the enricher starting from the time of initialization.
	// This includes the time taken to submit the package for analysis and
	// waiting for the analysis to complete.
	Timeout time.Duration

	// Timeout for the gRPC operation to the Malysis service. This timeout
	// is for a single gRPC operation.
	GrpcOperationTimeout time.Duration

	// Number of workers to poll the analysis results
	QueryWorkerCount int

	// Maximum number of retries for querying the analysis results for a given
	// analysis identifier.
	MaxQueryRetries int
}

func DefaultMalysisMalwareEnricherConfig() MalysisMalwareEnricherConfig {
	return MalysisMalwareEnricherConfig{
		Timeout:              5 * time.Minute,
		QueryWorkerCount:     10,
		GrpcOperationTimeout: 10 * time.Second,
		MaxQueryRetries:      10,
	}
}

type malysisMalwareEnricher struct {
	cc     *grpc.ClientConn
	client malysisv1grpc.MalwareAnalysisServiceClient
	config MalysisMalwareEnricherConfig

	githubClient *adapters.GithubClient

	// Cache of analysis identifiers to poll and apply the results
	// to all packages that were submitted for analysis. Malysis internally
	// maintains a cache for analysis result for a given package. It will
	// not re-analyse the package if it has already been analysed or submitted
	// for analysis.
	queryCache map[string][]*models.Package
	qcLock     sync.Mutex

	// Channel to submit analysis identifiers to the query worker
	queryChannel chan *analysisQueryRequest

	// Channel to push results by the query worker
	resultsChannel chan *analysisQueryResult

	// Wait group to synchronize between submissions and completions
	wg sync.WaitGroup

	// Waiting context
	ctx context.Context
}

var _ PackageMetaEnricher = (*malysisMalwareEnricher)(nil)

type analysisSubmissionResult struct {
	analysisId string
}

type analysisQueryRequest struct {
	analysisId  string
	retryCount  int
	nextRetryAt time.Time
}

type analysisQueryResult struct {
	req      *analysisQueryRequest
	response *malysisv1.GetAnalysisReportResponse
	err      error
}

func NewMalysisMalwareEnricher(cc *grpc.ClientConn,
	gha *adapters.GithubClient,
	config MalysisMalwareEnricherConfig,
) (*malysisMalwareEnricher, error) {
	if cc == nil {
		return nil, errors.New("grpc client connection is required")
	}

	if gha == nil {
		return nil, errors.New("github client is required")
	}

	ctx, cancelFn := context.WithTimeout(context.Background(), config.Timeout)
	go func() {
		_ = time.AfterFunc(config.Timeout+(500*time.Millisecond), cancelFn)
	}()

	client := malysisv1grpc.NewMalwareAnalysisServiceClient(cc)
	enricher := &malysisMalwareEnricher{
		cc:             cc,
		githubClient:   gha,
		client:         client,
		config:         config,
		queryCache:     make(map[string][]*models.Package),
		queryChannel:   make(chan *analysisQueryRequest, 10000),
		resultsChannel: make(chan *analysisQueryResult, 10000),
		ctx:            ctx,
	}

	err := enricher.startResultWorker(ctx)
	if err != nil {
		cancelFn()
		return nil, fmt.Errorf("failed to start result worker: %w", err)
	}

	err = enricher.startQueryWorker(ctx)
	if err != nil {
		cancelFn()
		return nil, fmt.Errorf("failed to start query worker: %w", err)
	}

	return enricher, nil
}

func (e *malysisMalwareEnricher) Name() string {
	return "Malysis Malware Enricher"
}

// We will submit all packages for analysis because our backend has caching.
// This will ensure that eventually the packages will get analysed and
// subsequent scans will have better coverage in terms of malware analysis.
func (e *malysisMalwareEnricher) Enrich(pkg *models.Package,
	_ PackageDependencyCallbackFn,
) error {
	// Submit for analysis
	res, err := e.submitPackageForAnalysis(pkg)
	if err != nil {
		return fmt.Errorf("failed to submit package for analysis: %w", err)
	}

	// Wait for the analysis to complete
	e.wg.Add(1)

	// Add the analysis identifier to the cache for query go routine
	// to poll and apply the results to all packages that were submitted
	e.qcLock.Lock()
	defer e.qcLock.Unlock()

	if _, ok := e.queryCache[res.analysisId]; !ok {
		e.queryCache[res.analysisId] = make([]*models.Package, 0)
	}

	e.queryCache[res.analysisId] = append(e.queryCache[res.analysisId], pkg)
	e.enqueueAnalysisForQuery(&analysisQueryRequest{analysisId: res.analysisId})

	return nil
}

// Wait returns when there are no more pending enrichments
// or when timeout is reached.
func (e *malysisMalwareEnricher) Wait() error {
	ch := make(chan bool)
	go func() {
		e.wg.Wait()
		close(ch)
	}()

	select {
	case <-e.ctx.Done():
		return e.ctx.Err()
	case <-ch:
		return nil
	}
}

func (e *malysisMalwareEnricher) enqueueAnalysisForQuery(req *analysisQueryRequest) {
	// If we are in the backoff period, we will schedule an enqueue
	// operation in the future else we will submit the request immediately.
	if time.Now().Before(req.nextRetryAt) {
		// We are creating unbounded go routines here. This is fine because
		// the number of retries are limited and the go routines will be
		// garbage collected once the retries are exhausted.
		go func(ctx context.Context) {
			timer := time.NewTimer(time.Until(req.nextRetryAt.Add(100 * time.Millisecond)))
			defer timer.Stop()

			select {
			case <-ctx.Done():
				return
			case <-timer.C:
			}

			e.queryChannel <- req
		}(e.ctx)
	} else {
		req.retryCount++
		req.nextRetryAt = time.Now().Add(5 * time.Second * time.Duration(req.retryCount))

		e.queryChannel <- req
	}
}

func (e *malysisMalwareEnricher) submitPackageForAnalysis(pkg *models.Package) (*analysisSubmissionResult, error) {
	logger.Infof("[Malware Analysis] Submitting package for malware analysis: %s/%s/%s",
		pkg.Manifest.Ecosystem, pkg.PackageDetails.Name, pkg.PackageDetails.Version)

	req := malysisv1.AnalyzePackageRequest{
		Target: &malysisv1pb.PackageAnalysisTarget{
			PackageVersion: &packagev1.PackageVersion{
				Package: &packagev1.Package{
					Ecosystem: pkg.GetControlTowerSpecEcosystem(),
					Name:      pkg.GetName(),
				},
				Version: pkg.GetVersion(),
			},
		},
	}

	ctx, cancelFn := context.WithTimeout(context.Background(), e.config.GrpcOperationTimeout)
	defer cancelFn()

	// This is an exception case, we cannot use the version as is because GitHub Actions
	// tags, branches are mutable and can change. Hence we will resolve the commit hash
	// for the given version and use it for analysis.
	if req.GetTarget().GetPackageVersion().GetPackage().GetEcosystem() == packagev1.Ecosystem_ECOSYSTEM_GITHUB_ACTIONS {
		logger.Debugf("[Malware Analysis] Resolving commit hash for GitHub Actions package: %s/%s",
			pkg.GetName(), pkg.GetVersion())

		parts := strings.Split(pkg.GetName(), "/")
		if len(parts) != 2 {
			return nil, fmt.Errorf("invalid package name: %s for GitHub Actions - should be in the format <owner>/<repo>", pkg.GetName())
		}

		commitHash, err := utils.ResolveGitHubRepositoryCommitSHA(ctx, e.githubClient, parts[0], parts[1], pkg.GetVersion())
		if err != nil {
			return nil, fmt.Errorf("failed to resolve commit hash for GitHub Actions package: %w", err)
		}

		logger.Debugf("[Malware Analysis] Resolved commit hash for GitHub Actions package: %s/%s@%s",
			parts[0], parts[1], commitHash)

		// Patch the version in the request
		req.GetTarget().GetPackageVersion().Version = commitHash
	}

	res, err := e.client.AnalyzePackage(ctx, &req)
	if err != nil {
		return nil, fmt.Errorf("failed to submit package for analysis: %w", err)
	}

	return &analysisSubmissionResult{analysisId: res.AnalysisId}, nil
}

// Receive results from query worker and apply to all packages.
// In case of error, re-submit for query.
func (e *malysisMalwareEnricher) startResultWorker(ctx context.Context) error {
	go func() {
		for {
			select {
			case <-ctx.Done():
				return
			case msg, ok := <-e.resultsChannel:
				if !ok {
					return
				}

				// Extract the response and error from the message
				res, err := msg.response, msg.err

				// Check for incomplete analysis
				if (res != nil) && err == nil {
					// If the analysis explicitly failed at the service provider, we will
					// log the error and will NOT retry the polling.
					if res.Status == malysisv1.AnalysisStatus_ANALYSIS_STATUS_FAILED {
						logger.Errorf("[Malware Analysis] Analysis Id: %s failed with error: %s",
							msg.req.analysisId, res.GetErrorMessage())

						e.wg.Done()
						continue
					}

					// For any other status, we will retry the polling by returning
					// an error to the results channel.
					if res.Status != malysisv1.AnalysisStatus_ANALYSIS_STATUS_COMPLETED {
						err = fmt.Errorf("analysis is not completed: %s", res.Status)
					}
				}

				if err != nil {
					if msg.req.retryCount >= e.config.MaxQueryRetries {
						logger.Errorf("[Malware Analysis] Max retries exceeded for analysis: %s", msg.req.analysisId)
						e.wg.Done()
						continue
					}

					e.enqueueAnalysisForQuery(msg.req)
					continue
				}

				// At this point, we will not retry the query any more
				func() {
					defer e.wg.Done()

					e.qcLock.Lock()
					defer e.qcLock.Unlock()

					if res == nil {
						logger.Errorf("[Malware Analysis] Empty response for analysis: %s", msg.req.analysisId)
						return
					}

					if res.GetReport() == nil {
						logger.Errorf("[Malware Analysis] Empty report for analysis: %s", msg.req.analysisId)
						return
					}

					// Apply the results to all packages that were submitted for analysis
					if packages, ok := e.queryCache[msg.req.analysisId]; ok {
						for _, pkg := range packages {
							// Apply the results to the package
							logger.Debugf("[Malware Analysis] Applying results to package: %s/%s/%s",
								pkg.Manifest.GetControlTowerSpecEcosystem(), pkg.GetName(), pkg.GetVersion())

							// Here we only enrich the package with the malware analysis result.
							// We do not make a decision based on the result.
							pkg.SetMalwareAnalysisResult(&models.MalwareAnalysisResult{
								AnalysisId:         msg.req.analysisId,
								Report:             res.GetReport(),
								VerificationRecord: res.GetVerificationRecord(),
							})
						}
					}
				}()
			}
		}
	}()

	return nil
}

// Poll the result for a given analysisId
func (e *malysisMalwareEnricher) startQueryWorker(ctx context.Context) error {
	for i := 0; i < e.config.QueryWorkerCount; i++ {
		go func() {
			for {
				select {
				case <-ctx.Done():
					return
				case req, ok := <-e.queryChannel:
					if !ok {
						return
					}

					// If we are in the backoff period, we will not query the service
					if time.Now().Before(req.nextRetryAt) {
						logger.Debugf("[Malware Analysis] Retrying query for analysis report: %s in %s",
							req.analysisId, time.Until(req.nextRetryAt))

						e.resultsChannel <- &analysisQueryResult{
							req: req,
							err: errMalysisPollRetryInFuture,
						}

						continue
					}

					ctx, cancelFn := context.WithTimeout(ctx, e.config.GrpcOperationTimeout)
					defer cancelFn()

					res, err := e.client.GetAnalysisReport(ctx, &malysisv1.GetAnalysisReportRequest{
						AnalysisId: req.analysisId,
					})

					e.resultsChannel <- &analysisQueryResult{
						req:      req,
						response: res,
						err:      err,
					}
				}
			}
		}()
	}

	return nil
}
